package org.example.project.common.middleware.service.impl;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.example.project.common.error.enums.ErrorCode;
import org.example.project.common.error.exception.ServiceException;
import org.example.project.common.middleware.service.KafkaService;
import org.example.project.common.middleware.service.dto.KafkaDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;

/**
 * @author wenxy
 * @date 2020/9/29
 */
@Service("defaultKafkaService")
@AutoConfigureAfter(KafkaAutoConfiguration.class)
@ConditionalOnClass(KafkaTemplate.class)
public class DefaultKafkaServiceImpl implements KafkaService<Object, Object> {
    @Autowired
    KafkaTemplate kafkaTemplate;

    @Override
    public String send(KafkaDTO kafkaDTO) {
        try {
            ListenableFuture<SendResult> send = kafkaTemplate.send(kafkaDTO.toProduceRecord());
            return send.get().toString();
        } catch (InterruptedException | ExecutionException e) {
            throw new ServiceException(ErrorCode.C0120, e);
        }
    }

    @Override
    public List<String> partitionsFor(String topic) {
        List<PartitionInfo> partitionInfos = kafkaTemplate.partitionsFor(topic);
        return partitionInfos.stream().map(PartitionInfo::toString).collect(Collectors.toList());
    }

    @Override
    public Map<String, Object> metrics() {
        Map<MetricName, ? extends Metric> metricNameMap = kafkaTemplate.metrics();
        return metricNameMap.entrySet().stream().collect(Collectors.toMap(metricNameEntry -> metricNameEntry.getKey().toString(), metricNameEntry -> ToStringBuilder.reflectionToString(metricNameEntry.getValue(), SHORT_PREFIX_STYLE)));
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        kafkaTemplate.sendOffsetsToTransaction(offsets, consumerGroupId);
    }
}
